//! Provides the `BeaconProcessor`, a multi-threaded processor for messages received on the network
//! that need to be processed by the `BeaconChain`.
//!
//! Uses `tokio` tasks (instead of raw threads) to provide the following tasks:
//!
//! - A "manager" task, which either spawns worker tasks or enqueues work.
//! - One or more "worker" tasks which perform time-intensive work on the `BeaconChain`.
//! - A task managing the scheduling of work that needs to be re-processed.
//!
//! ## Purpose
//!
//! The purpose of the `BeaconProcessor` is to provide two things:
//!
//! 1. Moving long-running, blocking tasks off the main `tokio` executor.
//! 2. A fixed-length buffer for consensus messages.
//!
//! (1) ensures that we don't clog up the networking stack with long-running tasks, potentially
//! causing timeouts. (2) means that we can easily and explicitly reject messages when we're
//! overloaded and also distribute load across time.
//!
//! ## Detail
//!
//! There is a single "manager" thread who listens to three event channels. These events are
//! either:
//!
//! - A new parcel of work (work event).
//! - Indication that a worker has finished a parcel of work (worker idle).
//! - A work ready for reprocessing (work event).
//!
//! Then, there is a maximum of `n` "worker" blocking threads, where `n` is the CPU count.
//!
//! Whenever the manager receives a new parcel of work, it is either:
//!
//! - Provided to a newly-spawned worker tasks (if we are not already at `n` workers).
//! - Added to a queue.
//!
//! Whenever the manager receives a notification that a worker has finished a parcel of work, it
//! checks the queues to see if there are more parcels of work that can be spawned in a new worker
//! task.

use crate::work_reprocessing_queue::{
    QueuedBackfillBatch, QueuedColumnReconstruction, QueuedGossipBlock, ReprocessQueueMessage,
};
use futures::stream::{Stream, StreamExt};
use futures::task::Poll;
use lighthouse_network::{MessageId, NetworkGlobals, PeerId};
use logging::TimeLatch;
use logging::crit;
use parking_lot::Mutex;
pub use scheduler::work_reprocessing_queue;
use serde::{Deserialize, Serialize};
use slot_clock::SlotClock;
use std::cmp;
use std::collections::{HashSet, VecDeque};
use std::fmt;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Context;
use std::time::{Duration, Instant};
use strum::IntoStaticStr;
use task_executor::{RayonPoolType, TaskExecutor};
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::TrySendError;
use tracing::{debug, error, trace, warn};
use types::{
    BeaconState, ChainSpec, EthSpec, Hash256, RelativeEpoch, SignedAggregateAndProof,
    SingleAttestation, Slot, SubnetId,
};
use work_reprocessing_queue::IgnoredRpcBlock;
use work_reprocessing_queue::{
    QueuedAggregate, QueuedLightClientUpdate, QueuedRpcBlock, QueuedUnaggregate, ReadyWork,
    spawn_reprocess_scheduler,
};

mod metrics;
pub mod scheduler;

/// The maximum size of the channel for work events to the `BeaconProcessor`.
///
/// Setting this too low will cause consensus messages to be dropped.
const DEFAULT_MAX_WORK_EVENT_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for idle events to the `BeaconProcessor`.
///
/// Setting this too low will prevent new workers from being spawned. It *should* only need to be
/// set to the CPU count, but we set it high to be safe.
const MAX_IDLE_QUEUE_LEN: usize = 16_384;

/// The maximum size of the channel for re-processing work events.
const DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN: usize = 3 * DEFAULT_MAX_WORK_EVENT_QUEUE_LEN / 4;

/// Over-provision queues based on active validator count by some factor. The beacon chain has
/// strict churns that prevent the validator set size from changing rapidly. By over-provisioning
/// slightly, we don't need to adjust the queues during the lifetime of a process.
const ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT: usize = 110;

/// Minimum size of dynamically sized queues. Due to integer division we don't want 0 length queues
/// as the processor won't process that message type. 128 is an arbitrary value value >= 1 that
/// seems reasonable.
const MIN_QUEUE_LEN: usize = 128;

/// Maximum number of queued items that will be stored before dropping them
pub struct BeaconProcessorQueueLengths {
    aggregate_queue: usize,
    attestation_queue: usize,
    unknown_block_aggregate_queue: usize,
    unknown_block_attestation_queue: usize,
    sync_message_queue: usize,
    sync_contribution_queue: usize,
    gossip_voluntary_exit_queue: usize,
    gossip_proposer_slashing_queue: usize,
    gossip_attester_slashing_queue: usize,
    unknown_light_client_update_queue: usize,
    rpc_block_queue: usize,
    rpc_blob_queue: usize,
    rpc_custody_column_queue: usize,
    column_reconstruction_queue: usize,
    chain_segment_queue: usize,
    backfill_chain_segment: usize,
    gossip_block_queue: usize,
    gossip_blob_queue: usize,
    gossip_data_column_queue: usize,
    delayed_block_queue: usize,
    status_queue: usize,
    bbrange_queue: usize,
    bbroots_queue: usize,
    blbroots_queue: usize,
    blbrange_queue: usize,
    dcbroots_queue: usize,
    dcbrange_queue: usize,
    gossip_bls_to_execution_change_queue: usize,
    lc_gossip_finality_update_queue: usize,
    lc_gossip_optimistic_update_queue: usize,
    lc_bootstrap_queue: usize,
    lc_rpc_optimistic_update_queue: usize,
    lc_rpc_finality_update_queue: usize,
    lc_update_range_queue: usize,
    api_request_p0_queue: usize,
    api_request_p1_queue: usize,
}

impl BeaconProcessorQueueLengths {
    pub fn from_state<E: EthSpec>(
        state: &BeaconState<E>,
        spec: &ChainSpec,
    ) -> Result<Self, String> {
        let active_validator_count =
            match state.get_cached_active_validator_indices(RelativeEpoch::Current) {
                Ok(indices) => indices.len(),
                Err(_) => state
                    .get_active_validator_indices(state.current_epoch(), spec)
                    .map_err(|e| format!("Error computing active indices: {:?}", e))?
                    .len(),
            };
        let active_validator_count =
            (ACTIVE_VALIDATOR_COUNT_OVERPROVISION_PERCENT * active_validator_count) / 100;
        let slots_per_epoch = E::slots_per_epoch() as usize;

        Ok(Self {
            aggregate_queue: 4096,
            unknown_block_aggregate_queue: 1024,
            // Capacity for a full slot's worth of attestations if subscribed to all subnets
            attestation_queue: std::cmp::max(
                active_validator_count / slots_per_epoch,
                MIN_QUEUE_LEN,
            ),
            // Capacity for a full slot's worth of attestations if subscribed to all subnets
            unknown_block_attestation_queue: std::cmp::max(
                active_validator_count / slots_per_epoch,
                MIN_QUEUE_LEN,
            ),
            sync_message_queue: 2048,
            sync_contribution_queue: 1024,
            gossip_voluntary_exit_queue: 4096,
            gossip_proposer_slashing_queue: 4096,
            gossip_attester_slashing_queue: 4096,
            unknown_light_client_update_queue: 128,
            rpc_block_queue: 1024,
            rpc_blob_queue: 1024,
            // We don't request more than `PARENT_DEPTH_TOLERANCE` (32) lookups, so we can limit
            // this queue size. With 48 max blobs per block, each column sidecar list could be up to 12MB.
            rpc_custody_column_queue: 64,
            column_reconstruction_queue: 1,
            chain_segment_queue: 64,
            backfill_chain_segment: 64,
            gossip_block_queue: 1024,
            gossip_blob_queue: 1024,
            gossip_data_column_queue: 1024,
            delayed_block_queue: 1024,
            status_queue: 1024,
            bbrange_queue: 1024,
            bbroots_queue: 1024,
            blbroots_queue: 1024,
            blbrange_queue: 1024,
            dcbroots_queue: 1024,
            dcbrange_queue: 1024,
            gossip_bls_to_execution_change_queue: 16384,
            lc_gossip_finality_update_queue: 1024,
            lc_gossip_optimistic_update_queue: 1024,
            lc_bootstrap_queue: 1024,
            lc_rpc_optimistic_update_queue: 512,
            lc_rpc_finality_update_queue: 512,
            lc_update_range_queue: 512,
            api_request_p0_queue: 1024,
            api_request_p1_queue: 1024,
        })
    }
}

/// The name of the manager tokio task.
const MANAGER_TASK_NAME: &str = "beacon_processor_manager";

/// The name of the worker tokio tasks.
const WORKER_TASK_NAME: &str = "beacon_processor_worker";

/// The `MAX_..._BATCH_SIZE` variables define how many attestations can be included in a single
/// batch.
///
/// Choosing these values is difficult since there is a trade-off between:
///
/// - It is faster to verify one large batch than multiple smaller batches.
/// - "Poisoning" attacks have a larger impact as the batch size increases.
///
/// Poisoning occurs when an invalid signature is included in a batch of attestations. A single
/// invalid signature causes the entire batch to fail. When a batch fails, we fall-back to
/// individually verifying each attestation signature.
const DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE: usize = 64;
const DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE: usize = 64;

/// Unique IDs used for metrics and testing.
pub const WORKER_FREED: &str = "worker_freed";
pub const NOTHING_TO_DO: &str = "nothing_to_do";

#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct BeaconProcessorConfig {
    pub max_workers: usize,
    pub max_work_event_queue_len: usize,
    pub max_scheduled_work_queue_len: usize,
    pub max_gossip_attestation_batch_size: usize,
    pub max_gossip_aggregate_batch_size: usize,
    pub enable_backfill_rate_limiting: bool,
}

impl Default for BeaconProcessorConfig {
    fn default() -> Self {
        Self {
            max_workers: cmp::max(1, num_cpus::get()),
            max_work_event_queue_len: DEFAULT_MAX_WORK_EVENT_QUEUE_LEN,
            max_scheduled_work_queue_len: DEFAULT_MAX_SCHEDULED_WORK_QUEUE_LEN,
            max_gossip_attestation_batch_size: DEFAULT_MAX_GOSSIP_ATTESTATION_BATCH_SIZE,
            max_gossip_aggregate_batch_size: DEFAULT_MAX_GOSSIP_AGGREGATE_BATCH_SIZE,
            enable_backfill_rate_limiting: true,
        }
    }
}

// The channels necessary to instantiate a `BeaconProcessor`.
pub struct BeaconProcessorChannels<E: EthSpec> {
    pub beacon_processor_tx: BeaconProcessorSend<E>,
    pub beacon_processor_rx: mpsc::Receiver<WorkEvent<E>>,
}

impl<E: EthSpec> BeaconProcessorChannels<E> {
    pub fn new(config: &BeaconProcessorConfig) -> Self {
        let (beacon_processor_tx, beacon_processor_rx) =
            mpsc::channel(config.max_work_event_queue_len);

        Self {
            beacon_processor_tx: BeaconProcessorSend(beacon_processor_tx),
            beacon_processor_rx,
        }
    }
}

impl<E: EthSpec> Default for BeaconProcessorChannels<E> {
    fn default() -> Self {
        Self::new(&BeaconProcessorConfig::default())
    }
}

/// A simple first-in-first-out queue with a maximum length.
struct FifoQueue<T> {
    queue: VecDeque<T>,
    max_length: usize,
}

impl<T> FifoQueue<T> {
    /// Create a new, empty queue with the given length.
    pub fn new(max_length: usize) -> Self {
        Self {
            queue: VecDeque::default(),
            max_length,
        }
    }

    /// Add a new item to the queue.
    ///
    /// Drops `item` if the queue is full.
    pub fn push(&mut self, item: T, item_desc: &str) {
        if self.queue.len() == self.max_length {
            error!(
                msg = "the system has insufficient resources for load",
                queue_len = self.max_length,
                queue = item_desc,
                "Work queue is full"
            )
        } else {
            self.queue.push_back(item);
        }
    }

    /// Remove the next item from the queue.
    pub fn pop(&mut self) -> Option<T> {
        self.queue.pop_front()
    }

    /// Returns the current length of the queue.
    pub fn len(&self) -> usize {
        self.queue.len()
    }
}

/// A simple last-in-first-out queue with a maximum length.
struct LifoQueue<T> {
    queue: VecDeque<T>,
    max_length: usize,
}

impl<T> LifoQueue<T> {
    /// Create a new, empty queue with the given length.
    pub fn new(max_length: usize) -> Self {
        Self {
            queue: VecDeque::default(),
            max_length,
        }
    }

    /// Add a new item to the front of the queue.
    ///
    /// If the queue is full, the item at the back of the queue is dropped.
    pub fn push(&mut self, item: T) {
        if self.queue.len() == self.max_length {
            self.queue.pop_back();
        }
        self.queue.push_front(item);
    }

    /// Remove the next item from the queue.
    pub fn pop(&mut self) -> Option<T> {
        self.queue.pop_front()
    }

    /// Returns `true` if the queue is full.
    pub fn is_full(&self) -> bool {
        self.queue.len() >= self.max_length
    }

    /// Returns the current length of the queue.
    pub fn len(&self) -> usize {
        self.queue.len()
    }
}

/// A handle that sends a message on the provided channel to a receiver when it gets dropped.
///
/// The receiver task is responsible for removing the provided `entry` from the `DuplicateCache`
/// and perform any other necessary cleanup.
pub struct DuplicateCacheHandle {
    entry: Hash256,
    cache: DuplicateCache,
}

impl Drop for DuplicateCacheHandle {
    fn drop(&mut self) {
        self.cache.remove(&self.entry);
    }
}

/// A simple  cache for detecting duplicate block roots across multiple threads.
#[derive(Clone, Default)]
pub struct DuplicateCache {
    inner: Arc<Mutex<HashSet<Hash256>>>,
}

impl DuplicateCache {
    /// Checks if the given block_root exists and inserts it into the cache if
    /// it doesn't exist.
    ///
    /// Returns a `Some(DuplicateCacheHandle)` if the block_root was successfully
    /// inserted and `None` if the block root already existed in the cache.
    ///
    /// The handle removes the entry from the cache when it is dropped. This ensures that any unclean
    /// shutdowns in the worker tasks does not leave inconsistent state in the cache.
    pub fn check_and_insert(&self, block_root: Hash256) -> Option<DuplicateCacheHandle> {
        let mut inner = self.inner.lock();
        if inner.insert(block_root) {
            Some(DuplicateCacheHandle {
                entry: block_root,
                cache: self.clone(),
            })
        } else {
            None
        }
    }

    /// Remove the given block_root from the cache.
    pub fn remove(&self, block_root: &Hash256) {
        let mut inner = self.inner.lock();
        inner.remove(block_root);
    }
}

/// An event to be processed by the manager task.
#[derive(Debug)]
pub struct WorkEvent<E: EthSpec> {
    pub drop_during_sync: bool,
    pub work: Work<E>,
}

impl<E: EthSpec> WorkEvent<E> {
    /// Get a representation of the type of work this `WorkEvent` contains.
    pub fn work_type(&self) -> WorkType {
        self.work.to_type()
    }

    /// Get a `str` representation of the type of work this `WorkEvent` contains.
    pub fn work_type_str(&self) -> &'static str {
        self.work_type().into()
    }
}

impl<E: EthSpec> From<ReadyWork> for WorkEvent<E> {
    fn from(ready_work: ReadyWork) -> Self {
        match ready_work {
            ReadyWork::Block(QueuedGossipBlock {
                beacon_block_slot,
                beacon_block_root,
                process_fn,
            }) => Self {
                drop_during_sync: false,
                work: Work::DelayedImportBlock {
                    beacon_block_slot,
                    beacon_block_root,
                    process_fn,
                },
            },
            ReadyWork::RpcBlock(QueuedRpcBlock {
                beacon_block_root: _,
                process_fn,
                ignore_fn: _,
            }) => Self {
                drop_during_sync: false,
                work: Work::RpcBlock { process_fn },
            },
            ReadyWork::IgnoredRpcBlock(IgnoredRpcBlock { process_fn }) => Self {
                drop_during_sync: false,
                work: Work::IgnoredRpcBlock { process_fn },
            },
            ReadyWork::Unaggregate(QueuedUnaggregate {
                beacon_block_root: _,
                process_fn,
            }) => Self {
                drop_during_sync: true,
                work: Work::UnknownBlockAttestation { process_fn },
            },
            ReadyWork::Aggregate(QueuedAggregate {
                process_fn,
                beacon_block_root: _,
            }) => Self {
                drop_during_sync: true,
                work: Work::UnknownBlockAggregate { process_fn },
            },
            ReadyWork::LightClientUpdate(QueuedLightClientUpdate {
                parent_root,
                process_fn,
            }) => Self {
                drop_during_sync: true,
                work: Work::UnknownLightClientOptimisticUpdate {
                    parent_root,
                    process_fn,
                },
            },
            ReadyWork::BackfillSync(QueuedBackfillBatch(process_fn)) => Self {
                drop_during_sync: false,
                work: Work::ChainSegmentBackfill(process_fn),
            },
            ReadyWork::ColumnReconstruction(QueuedColumnReconstruction { process_fn, .. }) => {
                Self {
                    drop_during_sync: true,
                    work: Work::ColumnReconstruction(process_fn),
                }
            }
        }
    }
}

/// Items required to verify a batch of unaggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAttestationPackage<T> {
    pub message_id: MessageId,
    pub peer_id: PeerId,
    pub attestation: Box<T>,
    pub subnet_id: SubnetId,
    pub should_import: bool,
    pub seen_timestamp: Duration,
}

/// Items required to verify a batch of aggregated gossip attestations.
#[derive(Debug)]
pub struct GossipAggregatePackage<E: EthSpec> {
    pub message_id: MessageId,
    pub peer_id: PeerId,
    pub aggregate: Box<SignedAggregateAndProof<E>>,
    pub beacon_block_root: Hash256,
    pub seen_timestamp: Duration,
}

#[derive(Clone)]
pub struct BeaconProcessorSend<E: EthSpec>(pub mpsc::Sender<WorkEvent<E>>);

impl<E: EthSpec> BeaconProcessorSend<E> {
    pub fn try_send(&self, message: WorkEvent<E>) -> Result<(), TrySendError<WorkEvent<E>>> {
        let work_type = message.work_type();
        match self.0.try_send(message) {
            Ok(res) => Ok(res),
            Err(e) => {
                metrics::inc_counter_vec(
                    &metrics::BEACON_PROCESSOR_SEND_ERROR_PER_WORK_TYPE,
                    &[work_type.into()],
                );
                Err(e)
            }
        }
    }
}

pub type AsyncFn = Pin<Box<dyn Future<Output = ()> + Send + Sync>>;
pub type BlockingFn = Box<dyn FnOnce() + Send + Sync>;
pub type BlockingFnWithManualSendOnIdle = Box<dyn FnOnce(SendOnDrop) + Send + Sync>;
pub enum BlockingOrAsync {
    Blocking(BlockingFn),
    Async(AsyncFn),
}
pub type GossipAttestationBatch = Vec<GossipAttestationPackage<SingleAttestation>>;

/// Indicates the type of work to be performed and therefore its priority and
/// queuing specifics.
pub enum Work<E: EthSpec> {
    GossipAttestation {
        attestation: Box<GossipAttestationPackage<SingleAttestation>>,
        process_individual:
            Box<dyn FnOnce(GossipAttestationPackage<SingleAttestation>) + Send + Sync>,
        process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
    },
    UnknownBlockAttestation {
        process_fn: BlockingFn,
    },
    GossipAttestationBatch {
        attestations: GossipAttestationBatch,
        process_batch: Box<dyn FnOnce(GossipAttestationBatch) + Send + Sync>,
    },
    GossipAggregate {
        aggregate: Box<GossipAggregatePackage<E>>,
        process_individual: Box<dyn FnOnce(GossipAggregatePackage<E>) + Send + Sync>,
        process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
    },
    UnknownBlockAggregate {
        process_fn: BlockingFn,
    },
    UnknownLightClientOptimisticUpdate {
        parent_root: Hash256,
        process_fn: BlockingFn,
    },
    GossipAggregateBatch {
        aggregates: Vec<GossipAggregatePackage<E>>,
        process_batch: Box<dyn FnOnce(Vec<GossipAggregatePackage<E>>) + Send + Sync>,
    },
    GossipBlock(AsyncFn),
    GossipBlobSidecar(AsyncFn),
    GossipDataColumnSidecar(AsyncFn),
    DelayedImportBlock {
        beacon_block_slot: Slot,
        beacon_block_root: Hash256,
        process_fn: AsyncFn,
    },
    GossipVoluntaryExit(BlockingFn),
    GossipProposerSlashing(BlockingFn),
    GossipAttesterSlashing(BlockingFn),
    GossipSyncSignature(BlockingFn),
    GossipSyncContribution(BlockingFn),
    GossipLightClientFinalityUpdate(BlockingFn),
    GossipLightClientOptimisticUpdate(BlockingFn),
    RpcBlock {
        process_fn: AsyncFn,
    },
    RpcBlobs {
        process_fn: AsyncFn,
    },
    RpcCustodyColumn(AsyncFn),
    ColumnReconstruction(AsyncFn),
    IgnoredRpcBlock {
        process_fn: BlockingFn,
    },
    ChainSegment(AsyncFn),
    ChainSegmentBackfill(BlockingFn),
    Status(BlockingFn),
    BlocksByRangeRequest(AsyncFn),
    BlocksByRootsRequest(AsyncFn),
    BlobsByRangeRequest(BlockingFn),
    BlobsByRootsRequest(BlockingFn),
    DataColumnsByRootsRequest(BlockingFn),
    DataColumnsByRangeRequest(BlockingFn),
    GossipBlsToExecutionChange(BlockingFn),
    LightClientBootstrapRequest(BlockingFn),
    LightClientOptimisticUpdateRequest(BlockingFn),
    LightClientFinalityUpdateRequest(BlockingFn),
    LightClientUpdatesByRangeRequest(BlockingFn),
    ApiRequestP0(BlockingOrAsync),
    ApiRequestP1(BlockingOrAsync),
    Reprocess(ReprocessQueueMessage),
}

impl<E: EthSpec> fmt::Debug for Work<E> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "{}", Into::<&'static str>::into(self.to_type()))
    }
}

#[derive(IntoStaticStr, PartialEq, Eq, Debug, Clone)]
#[strum(serialize_all = "snake_case")]
pub enum WorkType {
    GossipAttestation,
    GossipAttestationToConvert,
    UnknownBlockAttestation,
    GossipAttestationBatch,
    GossipAggregate,
    UnknownBlockAggregate,
    UnknownLightClientOptimisticUpdate,
    GossipAggregateBatch,
    GossipBlock,
    GossipBlobSidecar,
    GossipDataColumnSidecar,
    DelayedImportBlock,
    GossipVoluntaryExit,
    GossipProposerSlashing,
    GossipAttesterSlashing,
    GossipSyncSignature,
    GossipSyncContribution,
    GossipLightClientFinalityUpdate,
    GossipLightClientOptimisticUpdate,
    RpcBlock,
    RpcBlobs,
    RpcCustodyColumn,
    ColumnReconstruction,
    IgnoredRpcBlock,
    ChainSegment,
    ChainSegmentBackfill,
    Status,
    BlocksByRangeRequest,
    BlocksByRootsRequest,
    BlobsByRangeRequest,
    BlobsByRootsRequest,
    DataColumnsByRootsRequest,
    DataColumnsByRangeRequest,
    GossipBlsToExecutionChange,
    LightClientBootstrapRequest,
    LightClientOptimisticUpdateRequest,
    LightClientFinalityUpdateRequest,
    LightClientUpdatesByRangeRequest,
    ApiRequestP0,
    ApiRequestP1,
    Reprocess,
}

impl<E: EthSpec> Work<E> {
    fn str_id(&self) -> &'static str {
        self.to_type().into()
    }

    /// Provides a `&str` that uniquely identifies each enum variant.
    fn to_type(&self) -> WorkType {
        match self {
            Work::GossipAttestation { .. } => WorkType::GossipAttestation,
            Work::GossipAttestationBatch { .. } => WorkType::GossipAttestationBatch,
            Work::GossipAggregate { .. } => WorkType::GossipAggregate,
            Work::GossipAggregateBatch { .. } => WorkType::GossipAggregateBatch,
            Work::GossipBlock(_) => WorkType::GossipBlock,
            Work::GossipBlobSidecar(_) => WorkType::GossipBlobSidecar,
            Work::GossipDataColumnSidecar(_) => WorkType::GossipDataColumnSidecar,
            Work::DelayedImportBlock { .. } => WorkType::DelayedImportBlock,
            Work::GossipVoluntaryExit(_) => WorkType::GossipVoluntaryExit,
            Work::GossipProposerSlashing(_) => WorkType::GossipProposerSlashing,
            Work::GossipAttesterSlashing(_) => WorkType::GossipAttesterSlashing,
            Work::GossipSyncSignature(_) => WorkType::GossipSyncSignature,
            Work::GossipSyncContribution(_) => WorkType::GossipSyncContribution,
            Work::GossipLightClientFinalityUpdate(_) => WorkType::GossipLightClientFinalityUpdate,
            Work::GossipLightClientOptimisticUpdate(_) => {
                WorkType::GossipLightClientOptimisticUpdate
            }
            Work::GossipBlsToExecutionChange(_) => WorkType::GossipBlsToExecutionChange,
            Work::RpcBlock { .. } => WorkType::RpcBlock,
            Work::RpcBlobs { .. } => WorkType::RpcBlobs,
            Work::RpcCustodyColumn { .. } => WorkType::RpcCustodyColumn,
            Work::ColumnReconstruction(_) => WorkType::ColumnReconstruction,
            Work::IgnoredRpcBlock { .. } => WorkType::IgnoredRpcBlock,
            Work::ChainSegment { .. } => WorkType::ChainSegment,
            Work::ChainSegmentBackfill(_) => WorkType::ChainSegmentBackfill,
            Work::Status(_) => WorkType::Status,
            Work::BlocksByRangeRequest(_) => WorkType::BlocksByRangeRequest,
            Work::BlocksByRootsRequest(_) => WorkType::BlocksByRootsRequest,
            Work::BlobsByRangeRequest(_) => WorkType::BlobsByRangeRequest,
            Work::BlobsByRootsRequest(_) => WorkType::BlobsByRootsRequest,
            Work::DataColumnsByRootsRequest(_) => WorkType::DataColumnsByRootsRequest,
            Work::DataColumnsByRangeRequest(_) => WorkType::DataColumnsByRangeRequest,
            Work::LightClientBootstrapRequest(_) => WorkType::LightClientBootstrapRequest,
            Work::LightClientOptimisticUpdateRequest(_) => {
                WorkType::LightClientOptimisticUpdateRequest
            }
            Work::LightClientFinalityUpdateRequest(_) => WorkType::LightClientFinalityUpdateRequest,
            Work::LightClientUpdatesByRangeRequest(_) => WorkType::LightClientUpdatesByRangeRequest,
            Work::UnknownBlockAttestation { .. } => WorkType::UnknownBlockAttestation,
            Work::UnknownBlockAggregate { .. } => WorkType::UnknownBlockAggregate,
            Work::UnknownLightClientOptimisticUpdate { .. } => {
                WorkType::UnknownLightClientOptimisticUpdate
            }
            Work::ApiRequestP0 { .. } => WorkType::ApiRequestP0,
            Work::ApiRequestP1 { .. } => WorkType::ApiRequestP1,
            Work::Reprocess { .. } => WorkType::Reprocess,
        }
    }
}

/// Unifies all the messages processed by the `BeaconProcessor`.
enum InboundEvent<E: EthSpec> {
    /// A worker has completed a task and is free.
    WorkerIdle,
    /// There is new work to be done.
    WorkEvent((WorkEvent<E>, Instant)),
    /// A work event that was queued for re-processing has become ready.
    ReprocessingWork((WorkEvent<E>, Instant)),
}

/// Combines the various incoming event streams for the `BeaconProcessor` into a single stream.
///
/// This struct has a similar purpose to `tokio::select!`, however it allows for more fine-grained
/// control (specifically in the ordering of event processing).
struct InboundEvents<E: EthSpec> {
    /// Used by workers when they finish a task.
    idle_rx: mpsc::Receiver<WorkType>,
    /// Used by upstream processes to send new work to the `BeaconProcessor`.
    event_rx: mpsc::Receiver<WorkEvent<E>>,
    /// Used internally for queuing work ready to be re-processed.
    ready_work_rx: mpsc::Receiver<ReadyWork>,
}

impl<E: EthSpec> Stream for InboundEvents<E> {
    type Item = InboundEvent<E>;

    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        // Always check for idle workers before anything else. This allows us to ensure that a big
        // stream of new events doesn't suppress the processing of existing events.
        match self.idle_rx.poll_recv(cx) {
            Poll::Ready(Some(_)) => {
                return Poll::Ready(Some(InboundEvent::WorkerIdle));
            }
            Poll::Ready(None) => {
                return Poll::Ready(None);
            }
            Poll::Pending => {}
        }

        // Poll for delayed blocks before polling for new work. It might be the case that a delayed
        // block is required to successfully process some new work.
        match self.ready_work_rx.poll_recv(cx) {
            Poll::Ready(Some(ready_work)) => {
                return Poll::Ready(Some(InboundEvent::ReprocessingWork((
                    ready_work.into(),
                    Instant::now(),
                ))));
            }
            Poll::Ready(None) => {
                return Poll::Ready(None);
            }
            Poll::Pending => {}
        }

        match self.event_rx.poll_recv(cx) {
            Poll::Ready(Some(event)) => {
                return Poll::Ready(Some(InboundEvent::WorkEvent((event, Instant::now()))));
            }
            Poll::Ready(None) => {
                return Poll::Ready(None);
            }
            Poll::Pending => {}
        }

        Poll::Pending
    }
}

/// A mutli-threaded processor for messages received on the network
/// that need to be processed by the `BeaconChain`
///
/// See module level documentation for more information.
pub struct BeaconProcessor<E: EthSpec> {
    pub network_globals: Arc<NetworkGlobals<E>>,
    pub executor: TaskExecutor,
    pub current_workers: usize,
    pub config: BeaconProcessorConfig,
}

impl<E: EthSpec> BeaconProcessor<E> {
    /// Spawns the "manager" task which checks the receiver end of the returned `Sender` for
    /// messages which contain some new work which will be:
    ///
    /// - Performed immediately, if a worker is available.
    /// - Queued for later processing, if no worker is currently available.
    ///
    /// Only `self.config.max_workers` will ever be spawned at one time. Each worker is a `tokio` task
    /// started with `spawn_blocking`.
    ///
    /// The optional `work_journal_tx` allows for an outside process to receive a log of all work
    /// events processed by `self`. This should only be used during testing.
    #[allow(clippy::too_many_arguments)]
    pub fn spawn_manager<S: SlotClock + 'static>(
        mut self,
        event_rx: mpsc::Receiver<WorkEvent<E>>,
        work_journal_tx: Option<mpsc::Sender<&'static str>>,
        slot_clock: S,
        maximum_gossip_clock_disparity: Duration,
        queue_lengths: BeaconProcessorQueueLengths,
    ) -> Result<(), String> {
        // Used by workers to communicate that they are finished a task.
        let (idle_tx, idle_rx) = mpsc::channel::<WorkType>(MAX_IDLE_QUEUE_LEN);

        // Using LIFO queues for attestations since validator profits rely upon getting fresh
        // attestations into blocks. Additionally, later attestations contain more information than
        // earlier ones, so we consider them more valuable.
        let mut aggregate_queue = LifoQueue::new(queue_lengths.aggregate_queue);
        let mut aggregate_debounce = TimeLatch::default();
        let mut attestation_queue = LifoQueue::new(queue_lengths.attestation_queue);
        let mut attestation_to_convert_queue = LifoQueue::new(queue_lengths.attestation_queue);
        let mut attestation_debounce = TimeLatch::default();
        let mut unknown_block_aggregate_queue =
            LifoQueue::new(queue_lengths.unknown_block_aggregate_queue);
        let mut unknown_block_attestation_queue =
            LifoQueue::new(queue_lengths.unknown_block_attestation_queue);

        let mut sync_message_queue = LifoQueue::new(queue_lengths.sync_message_queue);
        let mut sync_contribution_queue = LifoQueue::new(queue_lengths.sync_contribution_queue);

        // Using a FIFO queue for voluntary exits since it prevents exit censoring. I don't have
        // a strong feeling about queue type for exits.
        let mut gossip_voluntary_exit_queue =
            FifoQueue::new(queue_lengths.gossip_voluntary_exit_queue);

        // Using a FIFO queue for slashing to prevent people from flushing their slashings from the
        // queues with lots of junk messages.
        let mut gossip_proposer_slashing_queue =
            FifoQueue::new(queue_lengths.gossip_proposer_slashing_queue);
        let mut gossip_attester_slashing_queue =
            FifoQueue::new(queue_lengths.gossip_attester_slashing_queue);

        // Using a FIFO queue since blocks need to be imported sequentially.
        let mut rpc_block_queue = FifoQueue::new(queue_lengths.rpc_block_queue);
        let mut rpc_blob_queue = FifoQueue::new(queue_lengths.rpc_blob_queue);
        let mut rpc_custody_column_queue = FifoQueue::new(queue_lengths.rpc_custody_column_queue);
        let mut column_reconstruction_queue =
            LifoQueue::new(queue_lengths.column_reconstruction_queue);
        let mut chain_segment_queue = FifoQueue::new(queue_lengths.chain_segment_queue);
        let mut backfill_chain_segment = FifoQueue::new(queue_lengths.backfill_chain_segment);
        let mut gossip_block_queue = FifoQueue::new(queue_lengths.gossip_block_queue);
        let mut gossip_blob_queue = FifoQueue::new(queue_lengths.gossip_blob_queue);
        let mut gossip_data_column_queue = FifoQueue::new(queue_lengths.gossip_data_column_queue);
        let mut delayed_block_queue = FifoQueue::new(queue_lengths.delayed_block_queue);

        let mut status_queue = FifoQueue::new(queue_lengths.status_queue);
        let mut bbrange_queue = FifoQueue::new(queue_lengths.bbrange_queue);
        let mut bbroots_queue = FifoQueue::new(queue_lengths.bbroots_queue);
        let mut blbroots_queue = FifoQueue::new(queue_lengths.blbroots_queue);
        let mut blbrange_queue = FifoQueue::new(queue_lengths.blbrange_queue);
        let mut dcbroots_queue = FifoQueue::new(queue_lengths.dcbroots_queue);
        let mut dcbrange_queue = FifoQueue::new(queue_lengths.dcbrange_queue);

        let mut gossip_bls_to_execution_change_queue =
            FifoQueue::new(queue_lengths.gossip_bls_to_execution_change_queue);

        // Using FIFO queues for light client updates to maintain sequence order.
        let mut lc_gossip_finality_update_queue =
            FifoQueue::new(queue_lengths.lc_gossip_finality_update_queue);
        let mut lc_gossip_optimistic_update_queue =
            FifoQueue::new(queue_lengths.lc_gossip_optimistic_update_queue);
        let mut unknown_light_client_update_queue =
            FifoQueue::new(queue_lengths.unknown_light_client_update_queue);
        let mut lc_bootstrap_queue = FifoQueue::new(queue_lengths.lc_bootstrap_queue);
        let mut lc_rpc_optimistic_update_queue =
            FifoQueue::new(queue_lengths.lc_rpc_optimistic_update_queue);
        let mut lc_rpc_finality_update_queue =
            FifoQueue::new(queue_lengths.lc_rpc_finality_update_queue);
        let mut lc_update_range_queue = FifoQueue::new(queue_lengths.lc_update_range_queue);

        let mut api_request_p0_queue = FifoQueue::new(queue_lengths.api_request_p0_queue);
        let mut api_request_p1_queue = FifoQueue::new(queue_lengths.api_request_p1_queue);

        // Channels for sending work to the re-process scheduler (`work_reprocessing_tx`) and to
        // receive them back once they are ready (`ready_work_rx`).
        let (ready_work_tx, ready_work_rx) =
            mpsc::channel::<ReadyWork>(self.config.max_scheduled_work_queue_len);

        let (reprocess_work_tx, reprocess_work_rx) =
            mpsc::channel::<ReprocessQueueMessage>(self.config.max_scheduled_work_queue_len);

        spawn_reprocess_scheduler(
            ready_work_tx,
            reprocess_work_rx,
            &self.executor,
            Arc::new(slot_clock),
            maximum_gossip_clock_disparity,
        )?;

        let executor = self.executor.clone();

        // The manager future will run on the core executor and delegate tasks to worker
        // threads on the blocking executor.
        let manager_future = async move {
            let mut inbound_events = InboundEvents {
                idle_rx,
                event_rx,
                ready_work_rx,
            };

            let enable_backfill_rate_limiting = self.config.enable_backfill_rate_limiting;

            loop {
                let (work_event, created_timestamp) = match inbound_events.next().await {
                    Some(InboundEvent::WorkerIdle) => {
                        self.current_workers = self.current_workers.saturating_sub(1);
                        (None, Instant::now())
                    }
                    Some(InboundEvent::WorkEvent((event, created_timestamp)))
                        if enable_backfill_rate_limiting =>
                    {
                        match QueuedBackfillBatch::try_from(event) {
                            Ok(backfill_batch) => {
                                match reprocess_work_tx
                                    .try_send(ReprocessQueueMessage::BackfillSync(backfill_batch))
                                {
                                    Err(e) => {
                                        warn!(
                                            error = %e,
                                            "Unable to queue backfill work event. Will try to process now."
                                        );
                                        match e {
                                            TrySendError::Full(reprocess_queue_message)
                                            | TrySendError::Closed(reprocess_queue_message) => {
                                                match reprocess_queue_message {
                                                    ReprocessQueueMessage::BackfillSync(
                                                        backfill_batch,
                                                    ) => (
                                                        Some(backfill_batch.into()),
                                                        created_timestamp,
                                                    ),
                                                    other => {
                                                        crit!(
                                                            message_type = other.as_ref(),
                                                            "Unexpected queue message type"
                                                        );
                                                        // This is an unhandled exception, drop the message.
                                                        continue;
                                                    }
                                                }
                                            }
                                        }
                                    }
                                    Ok(..) => {
                                        // backfill work sent to "reprocessing" queue. Process the next event.
                                        continue;
                                    }
                                }
                            }
                            Err(event) => (Some(event), created_timestamp),
                        }
                    }
                    Some(InboundEvent::WorkEvent((event, created_timestamp)))
                    | Some(InboundEvent::ReprocessingWork((event, created_timestamp))) => {
                        (Some(event), created_timestamp)
                    }
                    None => {
                        debug!(msg = "stream ended", "Gossip processor stopped");
                        break;
                    }
                };

                let _event_timer =
                    metrics::start_timer(&metrics::BEACON_PROCESSOR_EVENT_HANDLING_SECONDS);
                if let Some(event) = &work_event {
                    metrics::inc_counter_vec(
                        &metrics::BEACON_PROCESSOR_WORK_EVENTS_RX_COUNT,
                        &[event.work.str_id()],
                    );
                } else {
                    metrics::inc_counter(&metrics::BEACON_PROCESSOR_IDLE_EVENTS_TOTAL);
                }

                if let Some(work_journal_tx) = &work_journal_tx {
                    let id = work_event
                        .as_ref()
                        .map(|event| event.work.str_id())
                        .unwrap_or(WORKER_FREED);

                    // We don't care if this message was successfully sent, we only use the journal
                    // during testing. We also ignore reprocess messages to ensure our test cases can pass.
                    if id != "reprocess" {
                        let _ = work_journal_tx.try_send(id);
                    }
                }

                let can_spawn = self.current_workers < self.config.max_workers;
                let drop_during_sync = work_event
                    .as_ref()
                    .is_some_and(|event| event.drop_during_sync);

                let idle_tx = idle_tx.clone();
                let modified_queue_id = match work_event {
                    // There is no new work event, but we are able to spawn a new worker.
                    //
                    // We don't check the `work.drop_during_sync` here. We assume that if it made
                    // it into the queue at any point then we should process it.
                    None if can_spawn => {
                        // Check for chain segments first, they're the most efficient way to get
                        // blocks into the system.
                        let work_event: Option<Work<E>> =
                            if let Some(item) = chain_segment_queue.pop() {
                                Some(item)
                            // Check sync blocks before gossip blocks, since we've already explicitly
                            // requested these blocks.
                            } else if let Some(item) = rpc_block_queue.pop() {
                                Some(item)
                            } else if let Some(item) = rpc_blob_queue.pop() {
                                Some(item)
                            } else if let Some(item) = rpc_custody_column_queue.pop() {
                                Some(item)
                            } else if let Some(item) = rpc_custody_column_queue.pop() {
                                Some(item)
                            // Check delayed blocks before gossip blocks, the gossip blocks might rely
                            // on the delayed ones.
                            } else if let Some(item) = delayed_block_queue.pop() {
                                Some(item)
                            // Check gossip blocks before gossip attestations, since a block might be
                            // required to verify some attestations.
                            } else if let Some(item) = gossip_block_queue.pop() {
                                Some(item)
                            } else if let Some(item) = gossip_blob_queue.pop() {
                                Some(item)
                            } else if let Some(item) = gossip_data_column_queue.pop() {
                                Some(item)
                            } else if let Some(item) = column_reconstruction_queue.pop() {
                                Some(item)
                            // Check the priority 0 API requests after blocks and blobs, but before attestations.
                            } else if let Some(item) = api_request_p0_queue.pop() {
                                Some(item)
                            // Check the aggregates, *then* the unaggregates since we assume that
                            // aggregates are more valuable to local validators and effectively give us
                            // more information with less signature verification time.
                            } else if aggregate_queue.len() > 0 {
                                let batch_size = cmp::min(
                                    aggregate_queue.len(),
                                    self.config.max_gossip_aggregate_batch_size,
                                );

                                if batch_size < 2 {
                                    // One single aggregate is in the queue, process it individually.
                                    aggregate_queue.pop()
                                } else {
                                    // Collect two or more aggregates into a batch, so they can take
                                    // advantage of batch signature verification.
                                    //
                                    // Note: this will convert the `Work::GossipAggregate` item into a
                                    // `Work::GossipAggregateBatch` item.
                                    let mut aggregates = Vec::with_capacity(batch_size);
                                    let mut process_batch_opt = None;
                                    for _ in 0..batch_size {
                                        if let Some(item) = aggregate_queue.pop() {
                                            match item {
                                                Work::GossipAggregate {
                                                    aggregate,
                                                    process_individual: _,
                                                    process_batch,
                                                } => {
                                                    aggregates.push(*aggregate);
                                                    if process_batch_opt.is_none() {
                                                        process_batch_opt = Some(process_batch);
                                                    }
                                                }
                                                _ => {
                                                    error!("Invalid item in aggregate queue");
                                                }
                                            }
                                        }
                                    }

                                    if let Some(process_batch) = process_batch_opt {
                                        // Process all aggregates with a single worker.
                                        Some(Work::GossipAggregateBatch {
                                            aggregates,
                                            process_batch,
                                        })
                                    } else {
                                        // There is no good reason for this to
                                        // happen, it is a serious logic error.
                                        // Since we only form batches when multiple
                                        // work items exist, we should always have a
                                        // work closure at this point.
                                        crit!("Missing aggregate work");
                                        None
                                    }
                                }
                            // Check the unaggregated attestation queue.
                            //
                            // Potentially use batching.
                            } else if attestation_queue.len() > 0 {
                                let batch_size = cmp::min(
                                    attestation_queue.len(),
                                    self.config.max_gossip_attestation_batch_size,
                                );

                                if batch_size < 2 {
                                    // One single attestation is in the queue, process it individually.
                                    attestation_queue.pop()
                                } else {
                                    // Collect two or more attestations into a batch, so they can take
                                    // advantage of batch signature verification.
                                    //
                                    // Note: this will convert the `Work::GossipAttestation` item into a
                                    // `Work::GossipAttestationBatch` item.
                                    let mut attestations = Vec::with_capacity(batch_size);
                                    let mut process_batch_opt = None;
                                    for _ in 0..batch_size {
                                        if let Some(item) = attestation_queue.pop() {
                                            match item {
                                                Work::GossipAttestation {
                                                    attestation,
                                                    process_individual: _,
                                                    process_batch,
                                                } => {
                                                    attestations.push(*attestation);
                                                    if process_batch_opt.is_none() {
                                                        process_batch_opt = Some(process_batch);
                                                    }
                                                }
                                                _ => error!("Invalid item in attestation queue"),
                                            }
                                        }
                                    }

                                    if let Some(process_batch) = process_batch_opt {
                                        // Process all attestations with a single worker.
                                        Some(Work::GossipAttestationBatch {
                                            attestations,
                                            process_batch,
                                        })
                                    } else {
                                        // There is no good reason for this to
                                        // happen, it is a serious logic error.
                                        // Since we only form batches when multiple
                                        // work items exist, we should always have a
                                        // work closure at this point.
                                        crit!("Missing attestations work");
                                        None
                                    }
                                }
                            // Convert any gossip attestations that need to be converted.
                            } else if let Some(item) = attestation_to_convert_queue.pop() {
                                Some(item)
                            // Check sync committee messages after attestations as their rewards are lesser
                            // and they don't influence fork choice.
                            } else if let Some(item) = sync_contribution_queue.pop() {
                                Some(item)
                            } else if let Some(item) = sync_message_queue.pop() {
                                Some(item)
                            // Aggregates and unaggregates queued for re-processing are older and we
                            // care about fresher ones, so check those first.
                            } else if let Some(item) = unknown_block_aggregate_queue.pop() {
                                Some(item)
                            } else if let Some(item) = unknown_block_attestation_queue.pop() {
                                Some(item)
                            // Check RPC methods next. Status messages are needed for sync so
                            // prioritize them over syncing requests from other peers (BlocksByRange
                            // and BlocksByRoot)
                            } else if let Some(item) = status_queue.pop() {
                                Some(item)
                            } else if let Some(item) = bbrange_queue.pop() {
                                Some(item)
                            } else if let Some(item) = bbroots_queue.pop() {
                                Some(item)
                            } else if let Some(item) = blbrange_queue.pop() {
                                Some(item)
                            } else if let Some(item) = blbroots_queue.pop() {
                                Some(item)
                            } else if let Some(item) = dcbroots_queue.pop() {
                                Some(item)
                            } else if let Some(item) = dcbrange_queue.pop() {
                                Some(item)
                            // Check slashings after all other consensus messages so we prioritize
                            // following head.
                            //
                            // Check attester slashings before proposer slashings since they have the
                            // potential to slash multiple validators at once.
                            } else if let Some(item) = gossip_attester_slashing_queue.pop() {
                                Some(item)
                            } else if let Some(item) = gossip_proposer_slashing_queue.pop() {
                                Some(item)
                            // Check exits and address changes late since our validators don't get
                            // rewards from them.
                            } else if let Some(item) = gossip_voluntary_exit_queue.pop() {
                                Some(item)
                            } else if let Some(item) = gossip_bls_to_execution_change_queue.pop() {
                                Some(item)
                            // Check the priority 1 API requests after we've
                            // processed all the interesting things from the network
                            // and things required for us to stay in good repute
                            // with our P2P peers.
                            } else if let Some(item) = api_request_p1_queue.pop() {
                                Some(item)
                            // Handle backfill sync chain segments.
                            } else if let Some(item) = backfill_chain_segment.pop() {
                                Some(item)
                                // Handle light client requests.
                            } else if let Some(item) = lc_gossip_finality_update_queue.pop() {
                                Some(item)
                            } else if let Some(item) = lc_gossip_optimistic_update_queue.pop() {
                                Some(item)
                            } else if let Some(item) = unknown_light_client_update_queue.pop() {
                                Some(item)
                            } else if let Some(item) = lc_bootstrap_queue.pop() {
                                Some(item)
                            } else if let Some(item) = lc_rpc_optimistic_update_queue.pop() {
                                Some(item)
                            } else if let Some(item) = lc_rpc_finality_update_queue.pop() {
                                Some(item)
                            } else if let Some(item) = lc_update_range_queue.pop() {
                                Some(item)
                                // This statement should always be the final else statement.
                            } else {
                                // Let the journal know that a worker is freed and there's nothing else
                                // for it to do.
                                if let Some(work_journal_tx) = &work_journal_tx {
                                    // We don't care if this message was successfully sent, we only use the journal
                                    // during testing.
                                    let _ = work_journal_tx.try_send(NOTHING_TO_DO);
                                }
                                None
                            };

                        if let Some(work_event) = work_event {
                            let work_type = work_event.to_type();
                            self.spawn_worker(work_event, created_timestamp, idle_tx);
                            Some(work_type)
                        } else {
                            None
                        }
                    }
                    // There is no new work event and we are unable to spawn a new worker.
                    //
                    // I cannot see any good reason why this would happen.
                    None => {
                        warn!(
                            msg = "no new work and cannot spawn worker",
                            "Unexpected gossip processor condition"
                        );
                        None
                    }
                    // The chain is syncing and this event should be dropped during sync.
                    Some(work_event)
                        if self.network_globals.sync_state.read().is_syncing()
                            && drop_during_sync =>
                    {
                        let work_id = work_event.work.str_id();
                        metrics::inc_counter_vec(
                            &metrics::BEACON_PROCESSOR_WORK_EVENTS_IGNORED_COUNT,
                            &[work_id],
                        );
                        trace!(
                            msg = "chain is syncing",
                            work_id = work_id,
                            "Gossip processor skipping work"
                        );
                        None
                    }
                    // There is a new work event and the chain is not syncing. Process it or queue
                    // it.
                    Some(WorkEvent { work, .. }) => {
                        let work_id = work.str_id();
                        let work_type = work.to_type();

                        match work {
                            Work::Reprocess(work_event) => {
                                if let Err(e) = reprocess_work_tx.try_send(work_event) {
                                    error!(
                                        error = ?e,
                                        "Failed to reprocess work event"
                                    )
                                }
                            }
                            _ if can_spawn => self.spawn_worker(work, created_timestamp, idle_tx),
                            Work::GossipAttestation { .. } => attestation_queue.push(work),
                            // Attestation batches are formed internally within the
                            // `BeaconProcessor`, they are not sent from external services.
                            Work::GossipAttestationBatch { .. } => crit!(
                                work_type = "GossipAttestationBatch",
                                "Unsupported inbound event"
                            ),
                            Work::GossipAggregate { .. } => aggregate_queue.push(work),
                            // Aggregate batches are formed internally within the `BeaconProcessor`,
                            // they are not sent from external services.
                            Work::GossipAggregateBatch { .. } => {
                                crit!(
                                    work_type = "GossipAggregateBatch",
                                    "Unsupported inbound event"
                                )
                            }
                            Work::GossipBlock { .. } => gossip_block_queue.push(work, work_id),
                            Work::GossipBlobSidecar { .. } => gossip_blob_queue.push(work, work_id),
                            Work::GossipDataColumnSidecar { .. } => {
                                gossip_data_column_queue.push(work, work_id)
                            }
                            Work::DelayedImportBlock { .. } => {
                                delayed_block_queue.push(work, work_id)
                            }
                            Work::GossipVoluntaryExit { .. } => {
                                gossip_voluntary_exit_queue.push(work, work_id)
                            }
                            Work::GossipProposerSlashing { .. } => {
                                gossip_proposer_slashing_queue.push(work, work_id)
                            }
                            Work::GossipAttesterSlashing { .. } => {
                                gossip_attester_slashing_queue.push(work, work_id)
                            }
                            Work::GossipSyncSignature { .. } => sync_message_queue.push(work),
                            Work::GossipSyncContribution { .. } => {
                                sync_contribution_queue.push(work)
                            }
                            Work::GossipLightClientFinalityUpdate { .. } => {
                                lc_gossip_finality_update_queue.push(work, work_id)
                            }
                            Work::GossipLightClientOptimisticUpdate { .. } => {
                                lc_gossip_optimistic_update_queue.push(work, work_id)
                            }
                            Work::RpcBlock { .. } | Work::IgnoredRpcBlock { .. } => {
                                rpc_block_queue.push(work, work_id)
                            }
                            Work::RpcBlobs { .. } => rpc_blob_queue.push(work, work_id),
                            Work::RpcCustodyColumn { .. } => {
                                rpc_custody_column_queue.push(work, work_id)
                            }
                            Work::ColumnReconstruction(_) => column_reconstruction_queue.push(work),
                            Work::ChainSegment { .. } => chain_segment_queue.push(work, work_id),
                            Work::ChainSegmentBackfill { .. } => {
                                backfill_chain_segment.push(work, work_id)
                            }
                            Work::Status { .. } => status_queue.push(work, work_id),
                            Work::BlocksByRangeRequest { .. } => bbrange_queue.push(work, work_id),
                            Work::BlocksByRootsRequest { .. } => bbroots_queue.push(work, work_id),
                            Work::BlobsByRangeRequest { .. } => blbrange_queue.push(work, work_id),
                            Work::LightClientBootstrapRequest { .. } => {
                                lc_bootstrap_queue.push(work, work_id)
                            }
                            Work::LightClientOptimisticUpdateRequest { .. } => {
                                lc_rpc_optimistic_update_queue.push(work, work_id)
                            }
                            Work::LightClientFinalityUpdateRequest { .. } => {
                                lc_rpc_finality_update_queue.push(work, work_id)
                            }
                            Work::LightClientUpdatesByRangeRequest { .. } => {
                                lc_update_range_queue.push(work, work_id)
                            }
                            Work::UnknownBlockAttestation { .. } => {
                                unknown_block_attestation_queue.push(work)
                            }
                            Work::UnknownBlockAggregate { .. } => {
                                unknown_block_aggregate_queue.push(work)
                            }
                            Work::GossipBlsToExecutionChange { .. } => {
                                gossip_bls_to_execution_change_queue.push(work, work_id)
                            }
                            Work::BlobsByRootsRequest { .. } => blbroots_queue.push(work, work_id),
                            Work::DataColumnsByRootsRequest { .. } => {
                                dcbroots_queue.push(work, work_id)
                            }
                            Work::DataColumnsByRangeRequest { .. } => {
                                dcbrange_queue.push(work, work_id)
                            }
                            Work::UnknownLightClientOptimisticUpdate { .. } => {
                                unknown_light_client_update_queue.push(work, work_id)
                            }
                            Work::ApiRequestP0 { .. } => api_request_p0_queue.push(work, work_id),
                            Work::ApiRequestP1 { .. } => api_request_p1_queue.push(work, work_id),
                        };
                        Some(work_type)
                    }
                };

                if let Some(modified_queue_id) = modified_queue_id {
                    let queue_len = match modified_queue_id {
                        WorkType::GossipAttestation => attestation_queue.len(),
                        WorkType::GossipAttestationToConvert => attestation_to_convert_queue.len(),
                        WorkType::UnknownBlockAttestation => unknown_block_attestation_queue.len(),
                        WorkType::GossipAttestationBatch => 0, // No queue
                        WorkType::GossipAggregate => aggregate_queue.len(),
                        WorkType::UnknownBlockAggregate => unknown_block_aggregate_queue.len(),
                        WorkType::UnknownLightClientOptimisticUpdate => {
                            unknown_light_client_update_queue.len()
                        }
                        WorkType::GossipAggregateBatch => 0, // No queue
                        WorkType::GossipBlock => gossip_block_queue.len(),
                        WorkType::GossipBlobSidecar => gossip_blob_queue.len(),
                        WorkType::GossipDataColumnSidecar => gossip_data_column_queue.len(),
                        WorkType::DelayedImportBlock => delayed_block_queue.len(),
                        WorkType::GossipVoluntaryExit => gossip_voluntary_exit_queue.len(),
                        WorkType::GossipProposerSlashing => gossip_proposer_slashing_queue.len(),
                        WorkType::GossipAttesterSlashing => gossip_attester_slashing_queue.len(),
                        WorkType::GossipSyncSignature => sync_message_queue.len(),
                        WorkType::GossipSyncContribution => sync_contribution_queue.len(),
                        WorkType::GossipLightClientFinalityUpdate => {
                            lc_gossip_finality_update_queue.len()
                        }
                        WorkType::GossipLightClientOptimisticUpdate => {
                            lc_gossip_optimistic_update_queue.len()
                        }
                        WorkType::RpcBlock => rpc_block_queue.len(),
                        WorkType::RpcBlobs | WorkType::IgnoredRpcBlock => rpc_blob_queue.len(),
                        WorkType::RpcCustodyColumn => rpc_custody_column_queue.len(),
                        WorkType::ColumnReconstruction => column_reconstruction_queue.len(),
                        WorkType::ChainSegment => chain_segment_queue.len(),
                        WorkType::ChainSegmentBackfill => backfill_chain_segment.len(),
                        WorkType::Status => status_queue.len(),
                        WorkType::BlocksByRangeRequest => blbrange_queue.len(),
                        WorkType::BlocksByRootsRequest => blbroots_queue.len(),
                        WorkType::BlobsByRangeRequest => bbrange_queue.len(),
                        WorkType::BlobsByRootsRequest => bbroots_queue.len(),
                        WorkType::DataColumnsByRootsRequest => dcbroots_queue.len(),
                        WorkType::DataColumnsByRangeRequest => dcbrange_queue.len(),
                        WorkType::GossipBlsToExecutionChange => {
                            gossip_bls_to_execution_change_queue.len()
                        }
                        WorkType::LightClientBootstrapRequest => lc_bootstrap_queue.len(),
                        WorkType::LightClientOptimisticUpdateRequest => {
                            lc_rpc_optimistic_update_queue.len()
                        }
                        WorkType::LightClientFinalityUpdateRequest => {
                            lc_rpc_finality_update_queue.len()
                        }
                        WorkType::LightClientUpdatesByRangeRequest => lc_update_range_queue.len(),
                        WorkType::ApiRequestP0 => api_request_p0_queue.len(),
                        WorkType::ApiRequestP1 => api_request_p1_queue.len(),
                        WorkType::Reprocess => 0,
                    };
                    metrics::observe_vec(
                        &metrics::BEACON_PROCESSOR_QUEUE_LENGTH,
                        &[modified_queue_id.into()],
                        queue_len as f64,
                    );
                }

                if aggregate_queue.is_full() && aggregate_debounce.elapsed() {
                    error!(
                        msg = "the system has insufficient resources for load",
                        queue_len = aggregate_queue.max_length,
                        "Aggregate attestation queue full"
                    )
                }

                if attestation_queue.is_full() && attestation_debounce.elapsed() {
                    error!(
                        msg = "the system has insufficient resources for load",
                        queue_len = attestation_queue.max_length,
                        "Attestation queue full"
                    )
                }
            }
        };

        // Spawn on the core executor.
        executor.spawn(manager_future, MANAGER_TASK_NAME);
        Ok(())
    }

    /// Spawns a blocking worker thread to process some `Work`.
    ///
    /// Sends an message on `idle_tx` when the work is complete and the task is stopping.
    fn spawn_worker(
        &mut self,
        work: Work<E>,
        created_timestamp: Instant,
        idle_tx: mpsc::Sender<WorkType>,
    ) {
        let work_id = work.str_id();
        let work_type = work.to_type();

        // This metric tracks how long a work event has been in the queue
        metrics::observe_timer_vec(
            &metrics::BEACON_PROCESSOR_QUEUE_TIME,
            &[work_type.into()],
            Instant::now() - created_timestamp,
        );

        let worker_timer =
            metrics::start_timer_vec(&metrics::BEACON_PROCESSOR_WORKER_TIME, &[work_id]);
        metrics::inc_counter(&metrics::BEACON_PROCESSOR_WORKERS_SPAWNED_TOTAL);
        metrics::inc_counter_vec(
            &metrics::BEACON_PROCESSOR_WORK_EVENTS_STARTED_COUNT,
            &[work.str_id()],
        );

        metrics::inc_gauge_vec(
            &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
            &[work_id],
        );

        // Wrap the `idle_tx` in a struct that will fire the idle message whenever it is dropped.
        //
        // This helps ensure that the worker is always freed in the case of an early exit or panic.
        // As such, this instantiation should happen as early in the function as possible.
        let send_idle_on_drop = SendOnDrop {
            tx: idle_tx,
            work_type: work.to_type(),
            _worker_timer: worker_timer,
        };

        let worker_id = self.current_workers;
        self.current_workers = self.current_workers.saturating_add(1);

        let executor = self.executor.clone();

        trace!(
            work = work_id,
            worker = worker_id,
            "Spawning beacon processor worker"
        );

        let task_spawner = TaskSpawner {
            executor,
            send_idle_on_drop,
        };

        match work {
            Work::GossipAttestation {
                attestation,
                process_individual,
                process_batch: _,
            } => task_spawner.spawn_blocking(move || {
                process_individual(*attestation);
            }),
            Work::GossipAttestationBatch {
                attestations,
                process_batch,
            } => task_spawner.spawn_blocking(move || {
                process_batch(attestations);
            }),
            Work::GossipAggregate {
                aggregate,
                process_individual,
                process_batch: _,
            } => task_spawner.spawn_blocking(move || {
                process_individual(*aggregate);
            }),
            Work::GossipAggregateBatch {
                aggregates,
                process_batch,
            } => task_spawner.spawn_blocking(move || {
                process_batch(aggregates);
            }),
            Work::ChainSegment(process_fn) => task_spawner.spawn_async(async move {
                process_fn.await;
            }),
            Work::UnknownBlockAttestation { process_fn }
            | Work::UnknownBlockAggregate { process_fn }
            | Work::UnknownLightClientOptimisticUpdate { process_fn, .. } => {
                task_spawner.spawn_blocking(process_fn)
            }
            Work::DelayedImportBlock {
                beacon_block_slot: _,
                beacon_block_root: _,
                process_fn,
            } => task_spawner.spawn_async(process_fn),
            Work::RpcBlock { process_fn }
            | Work::RpcBlobs { process_fn }
            | Work::RpcCustodyColumn(process_fn)
            | Work::ColumnReconstruction(process_fn) => task_spawner.spawn_async(process_fn),
            Work::IgnoredRpcBlock { process_fn } => task_spawner.spawn_blocking(process_fn),
            Work::GossipBlock(work)
            | Work::GossipBlobSidecar(work)
            | Work::GossipDataColumnSidecar(work) => task_spawner.spawn_async(async move {
                work.await;
            }),
            Work::BlobsByRangeRequest(process_fn)
            | Work::BlobsByRootsRequest(process_fn)
            | Work::DataColumnsByRootsRequest(process_fn)
            | Work::DataColumnsByRangeRequest(process_fn) => {
                task_spawner.spawn_blocking(process_fn)
            }
            Work::BlocksByRangeRequest(work) | Work::BlocksByRootsRequest(work) => {
                task_spawner.spawn_async(work)
            }
            Work::ChainSegmentBackfill(process_fn) => {
                if self.config.enable_backfill_rate_limiting {
                    task_spawner.spawn_blocking_with_rayon(RayonPoolType::LowPriority, process_fn)
                } else {
                    // use the global rayon thread pool if backfill rate limiting is disabled.
                    task_spawner.spawn_blocking(process_fn)
                }
            }
            Work::ApiRequestP0(process_fn) | Work::ApiRequestP1(process_fn) => match process_fn {
                BlockingOrAsync::Blocking(process_fn) => task_spawner.spawn_blocking(process_fn),
                BlockingOrAsync::Async(process_fn) => task_spawner.spawn_async(process_fn),
            },
            Work::GossipVoluntaryExit(process_fn)
            | Work::GossipProposerSlashing(process_fn)
            | Work::GossipAttesterSlashing(process_fn)
            | Work::GossipSyncSignature(process_fn)
            | Work::GossipSyncContribution(process_fn)
            | Work::GossipLightClientFinalityUpdate(process_fn)
            | Work::GossipLightClientOptimisticUpdate(process_fn)
            | Work::Status(process_fn)
            | Work::GossipBlsToExecutionChange(process_fn)
            | Work::LightClientBootstrapRequest(process_fn)
            | Work::LightClientOptimisticUpdateRequest(process_fn)
            | Work::LightClientFinalityUpdateRequest(process_fn)
            | Work::LightClientUpdatesByRangeRequest(process_fn) => {
                task_spawner.spawn_blocking(process_fn)
            }
            Work::Reprocess(_) => {}
        };
    }
}

/// Spawns tasks that are either:
///
/// - Blocking (i.e. intensive methods that shouldn't run on the core `tokio` executor)
/// - Async (i.e. `async` methods)
///
/// Takes a `SendOnDrop` and ensures it is dropped after the task completes. This frees the beacon
/// processor worker so a new task can be started.
struct TaskSpawner {
    executor: TaskExecutor,
    send_idle_on_drop: SendOnDrop,
}

impl TaskSpawner {
    /// Spawn an async task, dropping the `SendOnDrop` after the task has completed.
    fn spawn_async(self, task: impl Future<Output = ()> + Send + 'static) {
        self.executor.spawn(
            async {
                task.await;
                drop(self.send_idle_on_drop)
            },
            WORKER_TASK_NAME,
        )
    }

    /// Spawn a blocking task, dropping the `SendOnDrop` after the task has completed.
    fn spawn_blocking<F>(self, task: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.executor.spawn_blocking(
            || {
                task();
                drop(self.send_idle_on_drop)
            },
            WORKER_TASK_NAME,
        )
    }

    /// Spawns a blocking task on a rayon thread pool, dropping the `SendOnDrop` after task completion.
    fn spawn_blocking_with_rayon<F>(self, rayon_pool_type: RayonPoolType, task: F)
    where
        F: FnOnce() + Send + 'static,
    {
        self.executor.spawn_blocking_with_rayon(
            move || {
                task();
                drop(self.send_idle_on_drop)
            },
            rayon_pool_type,
            WORKER_TASK_NAME,
        )
    }
}

/// This struct will send a message on `self.tx` when it is dropped. An error will be logged
/// if the send fails (this happens when the node is shutting down).
///
/// ## Purpose
///
/// This is useful for ensuring that a worker-freed message is still sent if a worker panics.
///
/// The Rust docs for `Drop` state that `Drop` is called during an unwind in a panic:
///
/// https://doc.rust-lang.org/std/ops/trait.Drop.html#panics
pub struct SendOnDrop {
    tx: mpsc::Sender<WorkType>,
    work_type: WorkType,
    // The field is unused, but it's here to ensure the timer is dropped once the task has finished.
    _worker_timer: Option<metrics::HistogramTimer>,
}

impl Drop for SendOnDrop {
    fn drop(&mut self) {
        metrics::dec_gauge_vec(
            &metrics::BEACON_PROCESSOR_WORKERS_ACTIVE_GAUGE_BY_TYPE,
            &[self.work_type.clone().into()],
        );

        if let Err(e) = self.tx.try_send(self.work_type.clone()) {
            warn!(
                msg = "did not free worker, shutdown may be underway",
                error = %e,
                "Unable to free worker"
            )
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use types::{BeaconState, ChainSpec, Eth1Data, ForkName, MainnetEthSpec};

    #[test]
    fn min_queue_len() {
        // State with no validators.
        let spec = ForkName::latest_stable().make_genesis_spec(ChainSpec::mainnet());
        let genesis_time = 0;
        let state = BeaconState::<MainnetEthSpec>::new(genesis_time, Eth1Data::default(), &spec);
        assert_eq!(state.validators().len(), 0);
        let queue_lengths = BeaconProcessorQueueLengths::from_state(&state, &spec).unwrap();
        assert_eq!(queue_lengths.attestation_queue, MIN_QUEUE_LEN);
        assert_eq!(queue_lengths.unknown_block_attestation_queue, MIN_QUEUE_LEN);
    }
}
